package ucloud

import (
	"strings"
	"time"

	"github.com/prometheus/client_golang/prometheus"
	plog "github.com/prometheus/common/log"

	"gitee.com/monitor_dev/ucloud_kafka_exporter/kafka_exporter"
)

func Collector(resource *resource, reloadTime *int, projectID string) {
	defer func() {
		if recover() != nil {
			plog.Error("ucloud_kafka collector err", recover())
		}
	}()
	//重新reload通道
	reloadChan := time.NewTimer(time.Second)
	defer reloadChan.Stop()
	for {
		select {
		case <-reloadChan.C:
			plog.Infof("ucloud kafka resource reload")
			err := resource.Update()
			if err != nil {
				plog.Error(err.Error(), "ucloud kafka resource update error")
			}
			list := resource.List()
			for _, v := range list {
				//根据project_name进行过滤
				if projectID != "all" {
					if v.Zone.ProjectId != projectID {
						continue
					}
				}
				//判断是否存在
				if _, ok := kafka_exporter.ExporterMap[v.ClusterInstanceId]; !ok {
					go func(v *UkafkaCluster) {
						var labels = strings.Join([]string{
							"zone=" + v.Zone.Zone,
							"region=" + v.Zone.Region,
							"project_name=" + v.Zone.ProjectName,
							"project_id=" + v.Zone.ProjectId,
							"framework_version=" + v.FrameworkVersion,
							"cluster_id=" + v.ClusterInstanceId,
							"cluster_name=" + v.ClusterInstanceName,
						},
							",",
						)
						var opts = kafka_exporter.NewOpt(v.BrokerIP, v.ZookeeperIP, labels, v.FrameworkVersion)
						var down = make(chan int)
						go kafka_exporter.SimpleKafkaExporter(*opts, ".*", ".*", down, v.ClusterInstanceId)
					}(v)
				}
			}
			//反注册
			s := false
			for oldId, exporter := range kafka_exporter.ExporterMap {
				for newId, _ := range list {
					if oldId == newId {
						s = true
						break
					}
				}
				if !s {
					prometheus.Unregister(exporter)
					plog.Info("Unregister :" + oldId)
				}
			}
			plog.Info("collect kafka resource", kafka_exporter.ExporterMap)
			//重新reset时间
			reloadChan.Reset(time.Minute * time.Duration(*reloadTime))
		}
	}
}
